* Author: Colin Walters <walters@verbum.org>
*/
-/*
- * See:
- * https://mail.gnome.org/archives/ostree-list/2012-August/msg00021.html
- *
- * First, we synchronously fetch all requested refs, and resolve them
- * to SHA256 commit checksums.
- *
- * Now, there are two threads involved here. First, there's the
- * calling thread; we create a temporary #GMainContext, and iterate
- * it. This thread performs all HTTP requests.
- *
- * The calling thread communicates with the "metadata scanning"
- * thread. The purpose of the metadata thread is to avoid blocking
- * the main thread while reading from the repository. If a
- * transaction is interrupted for example, the next run will need to
- * lstat() each loose object, which could easily be 60000 or more.
- *
- * The two threads pass messages back and forth over queues. The deep
- * complexity in this code is determining when a pull process is
- * complete. When the main thread completes fetching a metadata
- * object, it passes it over to the metadata thread, which may in turn
- * queue more work for the main thread. That in turn may generate
- * more work for the metadata thread, etc.
- *
- * Work completion is presently done via sending special _IDLE message
- * down the queue; if both threads are idle, the main thread tells the
- * metadata thread to shut down, and then proceeds to stop iterating
- * the main context.
- *
- * There is still a race condition here. See
- * https://bugzilla.gnome.org/show_bug.cgi?id=706456
- */
-
#include "config.h"
#include "ostree.h"
#include "ostree-fetcher.h"
#include "otutil.h"
-typedef struct {
- enum {
- PULL_MSG_IDLE,
- PULL_MSG_FETCH,
- PULL_MSG_FETCH_DETACHED_METADATA,
- PULL_MSG_SCAN,
- PULL_MSG_QUIT
- } t;
- union {
- guint idle_serial;
- GVariant *item;
- } d;
-} PullWorkerMessage;
-
typedef struct {
OstreeRepo *repo;
OstreeRepoPullFlags flags;
OstreeAsyncProgress *progress;
gboolean transaction_resuming;
- volatile gint n_scanned_metadata;
+ enum {
+ OSTREE_PULL_PHASE_FETCHING_REFS,
+ OSTREE_PULL_PHASE_FETCHING_OBJECTS
+ } phase;
+ gint n_scanned_metadata;
SoupURI *fetching_sync_uri;
gboolean gpg_verify;
- GThread *metadata_thread;
- GMainContext *metadata_thread_context;
- GMainLoop *metadata_thread_loop;
GPtrArray *static_delta_metas;
- OtWaitableQueue *metadata_objects_to_scan;
- OtWaitableQueue *metadata_objects_to_fetch;
GHashTable *scanned_metadata; /* Maps object name to itself */
GHashTable *requested_metadata; /* Maps object name to itself */
GHashTable *requested_content; /* Maps object name to itself */
- guint checking_metadata_scan_complete : 1;
- guint metadata_scan_complete : 1;
- gboolean was_idle;
- guint idle_serial;
guint n_outstanding_metadata_fetches;
guint n_outstanding_metadata_write_requests;
guint n_outstanding_content_fetches;
...) G_GNUC_NULL_TERMINATED;
static gboolean scan_one_metadata_object (OtPullData *pull_data,
- const guchar *csum,
+ const char *csum,
OstreeObjectType objtype,
guint recursion_depth,
GCancellable *cancellable,
GError **error);
-static gboolean scan_one_metadata_object_v_name (OtPullData *pull_data,
- GVariant *object,
- GCancellable *cancellable,
- GError **error);
+static gboolean scan_one_metadata_object_c (OtPullData *pull_data,
+ const guchar *csum,
+ OstreeObjectType objtype,
+ guint recursion_depth,
+ GCancellable *cancellable,
+ GError **error);
static SoupURI *
suburi_new (SoupURI *base,
guint64 bytes_transferred = ostree_fetcher_bytes_transferred (pull_data->fetcher);
guint fetched = pull_data->n_fetched_metadata + pull_data->n_fetched_content;
guint requested = pull_data->n_requested_metadata + pull_data->n_requested_content;
- guint n_scanned_metadata = g_atomic_int_get (&pull_data->n_scanned_metadata);
+ guint n_scanned_metadata = pull_data->n_scanned_metadata;
g_assert (pull_data->progress);
return TRUE;
}
-static PullWorkerMessage *
-pull_worker_message_new (int msgtype, gpointer data)
-{
- PullWorkerMessage *msg = g_new (PullWorkerMessage, 1);
- msg->t = msgtype;
- switch (msgtype)
- {
- case PULL_MSG_IDLE:
- msg->d.idle_serial = GPOINTER_TO_UINT (data);
- break;
- case PULL_MSG_SCAN:
- case PULL_MSG_FETCH:
- case PULL_MSG_FETCH_DETACHED_METADATA:
- msg->d.item = data;
- break;
- case PULL_MSG_QUIT:
- break;
- }
- return msg;
-}
-
static void
throw_async_error (OtPullData *pull_data,
GError *error)
throw_async_error (pull_data, error);
- /* This is true in the phase when we're fetching refs */
- if (pull_data->metadata_objects_to_scan == NULL)
+ switch (pull_data->phase)
{
+ case OSTREE_PULL_PHASE_FETCHING_REFS:
if (!pull_data->fetching_sync_uri)
g_main_loop_quit (pull_data->loop);
- return;
- }
-
- if (pull_data->was_idle && !current_idle)
- {
- /* We transitioned to !idle */
- g_debug ("pull: No longer idle");
- pull_data->idle_serial++;
- pull_data->was_idle = FALSE;
- }
- else if (!pull_data->was_idle && current_idle)
- {
- pull_data->was_idle = TRUE;
- g_debug ("Sending new MSG_IDLE with serial %u", pull_data->idle_serial);
- ot_waitable_queue_push (pull_data->metadata_objects_to_scan,
- pull_worker_message_new (PULL_MSG_IDLE, GUINT_TO_POINTER (pull_data->idle_serial)));
- }
-
- if (pull_data->metadata_scan_complete && current_idle)
- {
- g_debug ("pull: metadata scan complete and idle, exiting mainloop");
-
- g_main_loop_quit (pull_data->loop);
+ break;
+ case OSTREE_PULL_PHASE_FETCHING_OBJECTS:
+ if (current_idle)
+ {
+ g_debug ("pull: idle, exiting mainloop");
+
+ g_main_loop_quit (pull_data->loop);
+ }
+ break;
}
}
return ret;
}
+static void
+enqueue_one_object_request (OtPullData *pull_data,
+ const char *checksum,
+ OstreeObjectType objtype,
+ gboolean is_detached_meta);
+
static gboolean
scan_dirtree_object (OtPullData *pull_data,
const char *checksum,
if (!file_is_stored && !g_hash_table_lookup (pull_data->requested_content, file_checksum))
{
g_hash_table_insert (pull_data->requested_content, file_checksum, file_checksum);
-
- g_debug ("queued fetch of content %s", file_checksum);
- ot_waitable_queue_push (pull_data->metadata_objects_to_fetch,
- pull_worker_message_new (PULL_MSG_FETCH,
- ostree_object_name_serialize (file_checksum, OSTREE_OBJECT_TYPE_FILE)));
- file_checksum = NULL; /* Transfer ownership to hash */
+ enqueue_one_object_request (pull_data, file_checksum, OSTREE_OBJECT_TYPE_FILE, FALSE);
+ file_checksum = NULL; /* Transfer ownership */
}
}
if (!ot_util_filename_validate (dirname, error))
goto out;
- if (!scan_one_metadata_object (pull_data, ostree_checksum_bytes_peek (tree_csum),
- OSTREE_OBJECT_TYPE_DIR_TREE, recursion_depth + 1,
- cancellable, error))
+ if (!scan_one_metadata_object_c (pull_data, ostree_checksum_bytes_peek (tree_csum),
+ OSTREE_OBJECT_TYPE_DIR_TREE, recursion_depth + 1,
+ cancellable, error))
goto out;
- if (!scan_one_metadata_object (pull_data, ostree_checksum_bytes_peek (meta_csum),
- OSTREE_OBJECT_TYPE_DIR_META, recursion_depth + 1,
- cancellable, error))
+ if (!scan_one_metadata_object_c (pull_data, ostree_checksum_bytes_peek (meta_csum),
+ OSTREE_OBJECT_TYPE_DIR_META, recursion_depth + 1,
+ cancellable, error))
goto out;
}
check_outstanding_requests_handle_error (pull_data, local_error);
}
-static void
-note_metadata_not_complete (OtPullData *pull_data)
-{
- if (pull_data->metadata_scan_complete)
- g_debug ("pull: Transition metadata scan complete -> not complete");
- pull_data->metadata_scan_complete = FALSE;
-}
-
static void
on_metadata_writed (GObject *object,
GAsyncResult *result,
goto out;
}
- note_metadata_not_complete (pull_data);
- ot_waitable_queue_push (pull_data->metadata_objects_to_scan,
- pull_worker_message_new (PULL_MSG_SCAN,
- g_variant_ref (fetch_data->object)));
+ if (!scan_one_metadata_object_c (pull_data, csum, objtype, 0,
+ pull_data->cancellable, error))
+ goto out;
+
out:
pull_data->n_outstanding_metadata_write_requests--;
(void) gs_file_unlink (fetch_data->temp_path, NULL, NULL);
check_outstanding_requests_handle_error (pull_data, local_error);
}
-static void
-enqueue_one_object_request (OtPullData *pull_data,
- GVariant *object_name,
- gboolean is_detached_meta);
-
static void
meta_fetch_on_complete (GObject *object,
GAsyncResult *result,
{
/* There isn't any detached metadata, just fetch the commit */
g_clear_error (&local_error);
- enqueue_one_object_request (pull_data, fetch_data->object, FALSE);
+ enqueue_one_object_request (pull_data, checksum, objtype, FALSE);
}
goto out;
pull_data->cancellable, error))
goto out;
- enqueue_one_object_request (pull_data, fetch_data->object, FALSE);
+ enqueue_one_object_request (pull_data, checksum, objtype, FALSE);
}
else
{
}
out:
+ g_assert (pull_data->n_outstanding_metadata_fetches > 0);
pull_data->n_outstanding_metadata_fetches--;
pull_data->n_fetched_metadata++;
throw_async_error (pull_data, local_error);
g_variant_get_child (commit, 6, "@ay", &tree_contents_csum);
g_variant_get_child (commit, 7, "@ay", &tree_meta_csum);
- if (!scan_one_metadata_object (pull_data, ostree_checksum_bytes_peek (tree_contents_csum),
- OSTREE_OBJECT_TYPE_DIR_TREE, recursion_depth + 1,
- cancellable, error))
+ if (!scan_one_metadata_object_c (pull_data,
+ ostree_checksum_bytes_peek (tree_contents_csum),
+ OSTREE_OBJECT_TYPE_DIR_TREE, recursion_depth + 1,
+ cancellable, error))
goto out;
- if (!scan_one_metadata_object (pull_data, ostree_checksum_bytes_peek (tree_meta_csum),
- OSTREE_OBJECT_TYPE_DIR_META, recursion_depth + 1,
- cancellable, error))
+ if (!scan_one_metadata_object_c (pull_data,
+ ostree_checksum_bytes_peek (tree_meta_csum),
+ OSTREE_OBJECT_TYPE_DIR_META, recursion_depth + 1,
+ cancellable, error))
goto out;
ret = TRUE;
static gboolean
scan_one_metadata_object (OtPullData *pull_data,
- const guchar *csum,
+ const char *csum,
OstreeObjectType objtype,
guint recursion_depth,
GCancellable *cancellable,
GError **error)
+{
+ guchar buf[32];
+ ostree_checksum_inplace_to_bytes (csum, buf);
+
+ return scan_one_metadata_object_c (pull_data, buf, objtype,
+ recursion_depth,
+ cancellable, error);
+}
+
+static gboolean
+scan_one_metadata_object_c (OtPullData *pull_data,
+ const guchar *csum,
+ OstreeObjectType objtype,
+ guint recursion_depth,
+ GCancellable *cancellable,
+ GError **error)
{
gboolean ret = FALSE;
gs_unref_variant GVariant *object = NULL;
if (!is_stored && !is_requested)
{
char *duped_checksum = g_strdup (tmp_checksum);
+ gboolean do_fetch_detached;
+
g_hash_table_insert (pull_data->requested_metadata, duped_checksum, duped_checksum);
-
- if (objtype == OSTREE_OBJECT_TYPE_COMMIT)
- ot_waitable_queue_push (pull_data->metadata_objects_to_fetch,
- pull_worker_message_new (PULL_MSG_FETCH_DETACHED_METADATA,
- g_variant_ref (object)));
- else
- ot_waitable_queue_push (pull_data->metadata_objects_to_fetch,
- pull_worker_message_new (PULL_MSG_FETCH,
- g_variant_ref (object)));
+
+ do_fetch_detached = (objtype == OSTREE_OBJECT_TYPE_COMMIT);
+ enqueue_one_object_request (pull_data, tmp_checksum, objtype, do_fetch_detached);
}
else if (is_stored)
{
pull_data->cancellable, error))
goto out;
break;
- case OSTREE_OBJECT_TYPE_FILE:
+ default:
g_assert_not_reached ();
break;
}
}
g_hash_table_insert (pull_data->scanned_metadata, g_variant_ref (object), object);
- g_atomic_int_inc (&pull_data->n_scanned_metadata);
+ pull_data->n_scanned_metadata++;
}
ret = TRUE;
return ret;
}
-static gboolean
-scan_one_metadata_object_v_name (OtPullData *pull_data,
- GVariant *object,
- GCancellable *cancellable,
- GError **error)
-{
- OstreeObjectType objtype;
- const char *checksum = NULL;
- gs_free guchar *csum = NULL;
-
- ostree_object_name_deserialize (object, &checksum, &objtype);
- csum = ostree_checksum_to_bytes (checksum);
-
- return scan_one_metadata_object (pull_data, csum, objtype, 0,
- cancellable, error);
-}
-
-typedef struct {
- OtPullData *pull_data;
- GError *error;
-} IdleThrowErrorData;
-
-static gboolean
-idle_throw_error (gpointer user_data)
-{
- IdleThrowErrorData *data = user_data;
-
- throw_async_error (data->pull_data, data->error);
-
- g_free (data);
- return FALSE;
-}
-
-static gboolean
-on_metadata_objects_to_scan_ready (gint fd,
- GIOCondition condition,
- gpointer user_data)
-{
- OtPullData *pull_data = user_data;
- PullWorkerMessage *msg;
- PullWorkerMessage *last_idle_msg = NULL;
- GError *local_error = NULL;
- GError **error = &local_error;
-
- while (ot_waitable_queue_pop (pull_data->metadata_objects_to_scan, (gpointer*)&msg))
- {
- if (msg->t == PULL_MSG_SCAN)
- {
- if (!scan_one_metadata_object_v_name (pull_data, msg->d.item,
- pull_data->cancellable, error))
- goto out;
- g_variant_unref (msg->d.item);
- g_free (msg);
- }
- else if (msg->t == PULL_MSG_IDLE)
- {
- g_free (last_idle_msg);
- last_idle_msg = msg;
- }
- else if (msg->t == PULL_MSG_QUIT)
- {
- g_free (msg);
- g_debug ("pull: Processing PULL_MSG_QUIT");
- g_main_loop_quit (pull_data->metadata_thread_loop);
- }
- else
- g_assert_not_reached ();
- }
-
- if (last_idle_msg)
- {
- g_debug ("pull: Processing PULL_MSG_IDLE");
- ot_waitable_queue_push (pull_data->metadata_objects_to_fetch,
- last_idle_msg);
- }
-
- out:
- if (local_error)
- {
- IdleThrowErrorData *throwdata = g_new0 (IdleThrowErrorData, 1);
- throwdata->pull_data = pull_data;
- throwdata->error = local_error;
- g_main_context_invoke (NULL, idle_throw_error, throwdata);
- }
- return TRUE;
-}
-
-/**
- * metadata_thread_main:
- *
- * Called from the metadatascan worker thread. If we're missing an
- * object from one of them, we queue a request to the main thread to
- * fetch it. When it's fetched, we get passed the object back and
- * scan it.
- */
-static gpointer
-metadata_thread_main (gpointer user_data)
-{
- OtPullData *pull_data = user_data;
- GSource *src;
-
- pull_data->metadata_thread_context = g_main_context_new ();
- pull_data->metadata_thread_loop = g_main_loop_new (pull_data->metadata_thread_context, TRUE);
-
- src = ot_waitable_queue_create_source (pull_data->metadata_objects_to_scan);
- g_source_set_callback (src, (GSourceFunc)on_metadata_objects_to_scan_ready, pull_data, NULL);
- g_source_attach (src, pull_data->metadata_thread_context);
- g_source_unref (src);
-
- g_main_loop_run (pull_data->metadata_thread_loop);
- return NULL;
-}
-
static void
enqueue_one_object_request (OtPullData *pull_data,
- GVariant *object_name,
+ const char *checksum,
+ OstreeObjectType objtype,
gboolean is_detached_meta)
{
- const char *checksum;
- OstreeObjectType objtype;
SoupURI *obj_uri = NULL;
gboolean is_meta;
FetchObjectData *fetch_data;
gs_free char *objpath = NULL;
- ostree_object_name_deserialize (object_name, &checksum, &objtype);
+ g_debug ("queuing fetch of %s.%s", checksum,
+ ostree_object_type_to_string (objtype));
if (is_detached_meta)
{
}
fetch_data = g_new0 (FetchObjectData, 1);
fetch_data->pull_data = pull_data;
- fetch_data->object = g_variant_ref (object_name);
+ fetch_data->object = ostree_object_name_serialize (checksum, objtype);
fetch_data->is_detached_meta = is_detached_meta;
ostree_fetcher_request_uri_with_partial_async (pull_data->fetcher, obj_uri, pull_data->cancellable,
is_meta ? meta_fetch_on_complete : content_fetch_on_complete, fetch_data);
soup_uri_free (obj_uri);
}
-static gboolean
-on_metadata_objects_to_fetch_ready (gint fd,
- GIOCondition condition,
- gpointer user_data)
-{
- OtPullData *pull_data = user_data;
- PullWorkerMessage *msg;
-
- if (!ot_waitable_queue_pop (pull_data->metadata_objects_to_fetch, (gpointer*)&msg))
- goto out;
-
- if (msg->t == PULL_MSG_IDLE)
- {
- pull_data->checking_metadata_scan_complete = FALSE;
- if (msg->d.idle_serial == pull_data->idle_serial)
- {
- g_debug ("marking metadata scan as complete");
- pull_data->metadata_scan_complete = TRUE;
- }
- }
- else if (msg->t == PULL_MSG_FETCH || msg->t == PULL_MSG_FETCH_DETACHED_METADATA)
- {
- gboolean is_detached_meta;
-
- note_metadata_not_complete (pull_data);
-
- is_detached_meta = msg->t == PULL_MSG_FETCH_DETACHED_METADATA;
-
- enqueue_one_object_request (pull_data, msg->d.item, is_detached_meta);
-
- g_variant_unref (msg->d.item);
- }
- else
- {
- g_assert_not_reached ();
- }
- g_free (msg);
-
- out:
- check_outstanding_requests_handle_error (pull_data, NULL);
-
- return TRUE;
-}
-
static gboolean
repo_get_string_key_inherit (OstreeRepo *repo,
const char *section,
return ret;
}
-static void
-initiate_commit_scan (OtPullData *pull_data,
- const char *checksum)
-{
- ot_waitable_queue_push (pull_data->metadata_objects_to_scan,
- pull_worker_message_new (PULL_MSG_SCAN,
- ostree_object_name_serialize (checksum, OSTREE_OBJECT_TYPE_COMMIT)));
-}
-
#if 0
static gboolean
request_static_delta_meta_sync (OtPullData *pull_data,
pull_data->gpg_verify = FALSE;
#endif
+ pull_data->phase = OSTREE_PULL_PHASE_FETCHING_REFS;
+
if (!ot_keyfile_get_boolean_with_default (config, remote_key, "tls-permissive",
FALSE, &tls_permissive, error))
goto out;
}
}
+ pull_data->phase = OSTREE_PULL_PHASE_FETCHING_OBJECTS;
+
if (!ostree_repo_prepare_transaction (pull_data->repo, &pull_data->transaction_resuming,
cancellable, error))
goto out;
g_debug ("resuming transaction: %s", pull_data->transaction_resuming ? "true" : " false");
- pull_data->metadata_objects_to_fetch = ot_waitable_queue_new ();
- pull_data->metadata_objects_to_scan = ot_waitable_queue_new ();
- pull_data->metadata_thread = g_thread_new ("metadatascan", metadata_thread_main, pull_data);
-
g_hash_table_iter_init (&hash_iter, commits_to_fetch);
while (g_hash_table_iter_next (&hash_iter, &key, &value))
{
const char *commit = value;
- initiate_commit_scan (pull_data, commit);
+ if (!scan_one_metadata_object (pull_data, commit, OSTREE_OBJECT_TYPE_COMMIT,
+ 0, pull_data->cancellable, error))
+ goto out;
}
g_hash_table_iter_init (&hash_iter, requested_refs_to_fetch);
while (g_hash_table_iter_next (&hash_iter, &key, &value))
{
const char *checksum = value;
- initiate_commit_scan (pull_data, checksum);
+ if (!scan_one_metadata_object (pull_data, checksum, OSTREE_OBJECT_TYPE_COMMIT,
+ 0, pull_data->cancellable, error))
+ goto out;
}
for (i = 0; i < pull_data->static_delta_metas->len; i++)
process_one_static_delta_meta (pull_data, pull_data->static_delta_metas->pdata[i]);
}
- {
- queue_src = ot_waitable_queue_create_source (pull_data->metadata_objects_to_fetch);
- g_source_set_callback (queue_src, (GSourceFunc)on_metadata_objects_to_fetch_ready, pull_data, NULL);
- g_source_attach (queue_src, pull_data->main_context);
- g_source_unref (queue_src);
- }
-
/* Now await work completion */
if (!run_mainloop_monitor_fetcher (pull_data))
goto out;
soup_uri_free (pull_data->base_uri);
if (queue_src)
g_source_destroy (queue_src);
- if (pull_data->metadata_thread)
- {
- ot_waitable_queue_push (pull_data->metadata_objects_to_scan,
- pull_worker_message_new (PULL_MSG_QUIT, NULL));
- g_thread_join (pull_data->metadata_thread);
- }
g_clear_pointer (&pull_data->static_delta_metas, (GDestroyNotify) g_ptr_array_unref);
- g_clear_pointer (&pull_data->metadata_objects_to_scan, (GDestroyNotify) ot_waitable_queue_unref);
- g_clear_pointer (&pull_data->metadata_objects_to_fetch, (GDestroyNotify) ot_waitable_queue_unref);
g_clear_pointer (&pull_data->scanned_metadata, (GDestroyNotify) g_hash_table_unref);
g_clear_pointer (&pull_data->requested_content, (GDestroyNotify) g_hash_table_unref);
g_clear_pointer (&pull_data->requested_metadata, (GDestroyNotify) g_hash_table_unref);